package ssm

//关于点的问题是用string.split("[.]")
//关于竖线的问题用 string.split("\\|"
//关于星号的问题用 string.split("\\*")
//关于斜线的问题用 sring.split("\\\\")
//关于中括号的问题用 sring.split("\\[\\]")
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import java.time
import java.util.Properties

object OfflineWareHouse{
  def main(args:Array[String]): Unit ={
    var count = 8
    while(count>0){
      Thread.sleep(5000)
      println("执行一次")
      logger
      count = count-1
    }
  }
  def logger(): Unit ={
    val file = "file:///C:/Users/Lenovo/Desktop/Working/Python/data/OfflineWareHouse/随即日志生成器20.log"
    val spark = SparkSession.builder().master("local[1]").appName("DataClearingLog").getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._
    val lineRDD = sc.textFile(file)
    val lines = lineRDD.flatMap(_.split("\n"))
    val line = lines.map(_.replaceAll("\"",""))
//    line.foreach(println)
    val rdd = line.map(_.split("\t")).map(x=>Row(x(0).trim,x(1).trim,x(2).trim,x(3).trim,x(4).trim))
//    rdd.foreach(println)
    val schema = StructType(Array(
      StructField("userIp", StringType),
      StructField("local_time", StringType),
      StructField("url", StringType),
      StructField("status", StringType),
      StructField("referer", StringType)
    ))
    val orgDF = spark.createDataFrame(rdd, schema)
    val d1 = orgDF
      .filter(x => x(3) == "200")
      .filter(x => x(4) != "-")
    //d1.show(5)

    val dfDetail = d1.map( row =>{
      val urls = row.getAs[String]("url").split(" ")
      val referer = row.getAs[String]("referer").split("\\?")
      val SE = referer(0)
      val names = referer(1)
        .split("=")
      val name = names(1)
      var map = Map("params" -> "null")
      val method = urls(0)
      val url = urls(1)
      val agreement = urls(2)
      (
        row.getAs[String]("userIp"),
        row.getAs[String]("local_time"),
        map.getOrElse("method",method),
        map.getOrElse("url",url),
        map.getOrElse("agreement",agreement),
        row.getAs[String]("status"),
        map.getOrElse("SE",SE),
        map.getOrElse("name",name)
      )
    }).toDF()
   // dfDetail.show(5)
    val dfDetailRDD=dfDetail.rdd

    val detailSchema = StructType(Array(
      StructField("userIp", StringType),
      StructField("local_time", StringType),
      StructField("method", StringType),
      StructField("url", StringType),
      StructField("agreement", StringType),
      StructField("status", StringType),
      StructField("SE", StringType),
      StructField("name", StringType)
    ))

    val detailDF = spark.createDataFrame(dfDetailRDD,detailSchema)
    detailDF.show()
    println(detailDF.count())
    //已经从存储过了,再写就是重写，会覆盖的哦
//    val prop = new Properties()
//    prop.put("user", "root")
//    prop.put("password", "z9633352")
//    prop.put("driver","com.mysql.jdbc.Driver")
//    val url = "jdbc:mysql://localhost:3306/python_db"
//    println("开始写入数据库")
//    detailDF.write.mode("overwrite").jdbc(url,"WareHouse",prop)
//    println("完成写入数据库")

  }
}

